package com.jiufengxinxi.ts.common.utils.socket.client;

import com.jiufengxinxi.ts.common.utils.HexUtil;
import com.jiufengxinxi.ts.common.utils.socket.ISocketRequestHandle;
import com.jiufengxinxi.ts.common.utils.thread.ExecutorServiceUtils;
import com.jiufengxinxi.ts.common.utils.thread.LoopThreadRunnable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.*;

public class MultiThreadClient {

	private Logger logger=LoggerFactory.getLogger(this.getClass());

	private Socket s = null;
	private DataOutputStream dos = null;
	private DataInputStream dis = null;
	private int port;
	private ISocketRequestHandle socketRequestHandle;

	private String host;
	private boolean connected;
	private String name;
	private String characterSet="UTF-8";

	private String splitChart=null;

	private boolean syn=false;

	private int timeout=30;

	private boolean close4sended=false;

	private String reviceContent;

	private boolean issending=false;

	private ICharFilter charFilter = new CharacaterFilter();

	private boolean locked = false;

	private String localIp;

	public MultiThreadClient(String host,int port){
		this(host,port,false);
	}

	public MultiThreadClient(String host,int port,boolean syn){
		this.port=port;

		this.host=host;

		this.syn=syn;

		charFilter.setCharacater(characterSet);
		//baseExecutorService=new BaseExecutorService();
	}

	public void send(byte[] bytes) {
		try {
			if(!connected){
				connection();
			}
			if(dos!=null) {
				dos.write(bytes);
				if(splitChart!=null) {
					dos.write(splitChart.getBytes());
				}
				dos.flush();
			}else {
				logger.error(this.host+":"+this.port+"    未连接:"+host+":"+port);
			}

		} catch (IOException e) {
			e.printStackTrace();
			disconnect();
		}catch(Exception e) {
			logger.error(this.host+":"+this.port+"    与服务主机连接过程出错，发送失败",e);
			disconnect();
//			try {
//				Thread.sleep(100);
//				send(bytes);
//			} catch (InterruptedException ex) {
//				ex.printStackTrace();
//			}
		}
	}

	public String send(String str) {
		try {
			if(!connected){
				connection();
			}
			if(isConnected()) {
				if (dos != null) {
					if(StringUtils.isNotEmpty(splitChart)){
						str=str+splitChart;
					}
					dos.write(str.getBytes(characterSet));
					dos.flush();
				} else {
					logger.error(this.host + ":" + this.port + "    未连接:" + host + ":" + port);
				}

				if (syn) {
					issending = true;//已经发送同步请求
					ExecutorService exec = Executors.newSingleThreadExecutor();
					Callable<String> call = new Callable<String>() {
						public String call() throws Exception {
							while (reviceContent == null) {
								Thread.sleep(2000);
							}

							String temp = reviceContent;
							reviceContent = null;
							return temp;
						}
					};
					Future<String> future = exec.submit(call);
					String response = null;
					try {
						response = future.get(1000 * timeout, TimeUnit.MILLISECONDS);
					} catch (InterruptedException e) {
						logger.warn("调用服务异常", e.getMessage());
						disconnect();
					} catch (ExecutionException e) {
						logger.warn("调用服务异常", e.getMessage());
						//if(close4sended){
						disconnect();
						//}
					} catch (TimeoutException e) {
						logger.warn("调用服务超时", e.getMessage());
						if (close4sended) {
							disconnect();
						}
					}

					issending = false;
					reviceContent = null;
					if (close4sended) {
						disconnect();
					}
					//String response = readFromInStream(dis);
					return response;
				}
			}
			return "ok";
		} catch(Exception e) {
			logger.error(this.host+":"+this.port+"    与服务主机连接过程出错，发送失败");
			disconnect();
		}

		return "false";
	}


	public void connection() {
		if (locked) {
			return;
		}
		synchronized (host + ":" + port) {
			try {
				locked = true;
				s = new Socket(host, port);
				connected = s.isConnected();
				if (connected) {
					logger.info("~~~~~~~~连接成功~~~~~~~~!" + host + ":" + port);
					this.localIp = s.getLocalAddress().getHostAddress();
					dos = new DataOutputStream(s.getOutputStream());
					dis = new DataInputStream(s.getInputStream());

					if (socketRequestHandle != null) {
						socketRequestHandle.connection(host + ":" + port);
					}

					ExecutorServiceUtils.execute(new LoopThreadRunnable() {

						@Override
						public boolean endCondition() {
							return connected;
						}

						@Override
						public void loopElement() {
							try {
								String str = readFromInStream(dis);
								if (!issending && StringUtils.isNotEmpty(str) && socketRequestHandle != null) {
									socketRequestHandle.request(str, name);
								} else {
									reviceContent = str;
								}
							} catch (IOException e) {
								disconnect();
							} catch (Exception e) {
								e.printStackTrace();
							}
						}

						@Override
						public int sleepTime() {
							return 0;
						}

						@Override
						public String getName() {
							return name;
						}

						@Override
						public boolean isUseLock() {
							return false;
						}
					});
				}

			} catch (UnknownHostException e) {
				logger.error(this.host + ":" + this.port + "    目标找不到主机");
				disconnect();
			} catch (IOException e) {
				//logger.error(this.host+":"+this.port+"    与服务主机连接过程出错");
				disconnect();
			} finally {
				locked = false;
			}
		}

	}


	public String readFromInStream(DataInputStream dis) throws IOException{
		ByteArrayOutputStream bos = new ByteArrayOutputStream();
		//byte[] cs=new byte[];
		byte[] a = new byte[1024];
		int len = 0;
		while((len = dis.read(a))>=1024){
			bos.write(a,0,len);
			len=0;
		}
		if(len>0){
			bos.write(a,0,len);
		}

		String str = bos.size()>0?charFilter.change(bos.toByteArray()):"";

		return str;
	}

	public void disconnect() {
		try {
			connected = false;
			if (dis != null)
				dis.close();
			if (dos != null)
				dos.close();
			if (s != null) {
				s.close();
			}
		} catch (IOException e) {
			logger.error(this.host+":"+this.port+"    断开过程出现异常");
		} catch (Exception e) {
			logger.error(this.host+":"+this.port+"    断开过程出现异常");
		}

		if(socketRequestHandle!=null) {
			socketRequestHandle.disconnection(this.host+":"+this.port);
		}

	}


	public Socket getS() {
		return s;
	}


	public void setS(Socket s) {
		this.s = s;
	}


	public DataOutputStream getDos() {
		return dos;
	}


	public void setDos(DataOutputStream dos) {
		this.dos = dos;
	}


	public DataInputStream getDis() {
		return dis;
	}


	public void setDis(DataInputStream dis) {
		this.dis = dis;
	}


	public int getPort() {
		return port;
	}


	public void setPort(int port) {
		this.port = port;
	}


	public String getHost() {
		return host;
	}


	public void setHost(String host) {
		this.host = host;
	}


	public boolean isConnected() {
		return connected;
	}


	public void setConnected(boolean connected) {
		this.connected = connected;
	}


	public ISocketRequestHandle getSocketRequestHandle() {
		return socketRequestHandle;
	}


	public void setSocketRequestHandle(ISocketRequestHandle socketRequestHandle) {
		this.socketRequestHandle = socketRequestHandle;
	}




	public String getName() {
		return name;
	}


	public void setName(String name) {
		this.name = name;
	}

	public String getCharacterSet() {
		return characterSet;
	}

	public void setCharacterSet(String characterSet) {
		this.characterSet = characterSet;
		charFilter.setCharacater(characterSet);
	}

	public boolean isSyn() {
		return syn;
	}

	public void setSyn(boolean syn) {
		this.syn = syn;
	}

	public int getTimeout() {
		return timeout;
	}

	public void setTimeout(int timeout) {
		this.timeout = timeout;
	}

	public boolean isClose4sended() {
		return close4sended;
	}

	public void setClose4sended(boolean close4sended) {
		this.close4sended = close4sended;
	}

	public boolean isIssending() {
		return issending;
	}

	public void setIssending(boolean issending) {
		this.issending = issending;
	}

	public ICharFilter getCharFilter() {
		return charFilter;
	}

	public void setCharFilter(ICharFilter charFilter) {
		this.charFilter = charFilter;
	}

	public String getLocalIp() {
		return localIp;
	}

	public void setLocalIp(String localIp) {
		this.localIp = localIp;
	}

	public String getSplitChart() {
		return splitChart;
	}

	public void setSplitChart(String splitChart) {
		this.splitChart = splitChart;
	}

	public interface ICharFilter{
		public String change(byte[] bs);

		public void setCharacater(String characater);
	}

	public static class CharacaterFilter implements ICharFilter{

		private String characater;

		@Override
		public String change(byte[] bs) {

			try {
				return new String(bs,characater);
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
			return null;
		}

		@Override
		public void setCharacater(String characater) {
			this.characater = characater;
		}


	}

	public static class HexFilter implements ICharFilter{

		@Override
		public String change(byte[] bs) {
			return HexUtil.bytesToHexFun3(bs);
		}

		@Override
		public void setCharacater(String characater) {
			// TODO Auto-generated method stub

		}

	}

}




